#!/usr/bin/python3 -cimport os, sys; os.execv(os.path.dirname(sys.argv[1]) + "/../common/pywrap", sys.argv)

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <https://www.gnu.org/licenses/>.

import unittest

import storagelib
import testlib


class TestStorageResize(storagelib.StorageCase):

    # LUKS uses memory hard PBKDF, 1 GiB is not enough; see https://bugzilla.redhat.com/show_bug.cgi?id=1881829
    provision = {
        "0": {"memory_mb": 1536}
    }

    def checkResize(self, fsys, crypto, can_shrink, can_grow, shrink_needs_unmount=None, grow_needs_unmount=None):
        m = self.machine
        b = self.browser

        need_passphrase = crypto and self.default_crypto_type == "luks2"
        filesystem_desc = f"{fsys} filesystem"
        filesystem_desc_enc = filesystem_desc
        if crypto:
            filesystem_desc_enc = filesystem_desc + " (encrypted)"

        self.login_and_go("/storage")
        disk = m.add_disk("500M", serial="DISK1")
        b.wait_visible(self.card_row("Storage", name=f"/dev/{disk['dev']}"))

        m.execute("vgcreate TEST /dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK1")
        b.wait_visible(self.card_row("Storage", name="TEST"))

        m.execute("lvcreate TEST -n vol -L 320m")  # minimum xfs size is ~300MB
        b.wait_visible(self.card_row("Storage", name="vol"))

        self.click_card_row("Storage", name="TEST")
        b.wait_visible(self.card("LVM2 volume group"))
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 1), "vol")

        self.click_dropdown(self.card_row("LVM2 logical volumes", 1), "Format")
        self.dialog_wait_open()
        self.dialog_wait_apply_enabled()
        self.dialog_set_val("name", "FSYS")
        self.dialog_set_val("type", fsys)
        self.dialog_set_val("mount_point", "/run/foo")
        if crypto:
            self.dialog_set_val("crypto", self.default_crypto_type)
            self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_set_val("passphrase2", "vainu-reku-toma-rolle-kaja")
        self.dialog_apply()
        with b.wait_timeout(60):
            self.dialog_wait_close()

        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), filesystem_desc_enc)
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 3), "/run/foo")

        if can_grow:
            b.click(self.card_row_col("LVM2 logical volumes", 1, 1))
            b.click(self.card_button("LVM2 logical volume", "Grow"))
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            if grow_needs_unmount:
                b.wait_in_text("#dialog", "unmount, grow")
            self.dialog_set_val("size", 400)
            if need_passphrase:
                self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            self.dialog_wait_close()
            b.wait_in_text(self.card_desc("LVM2 logical volume", "Size"), "398 MB")
            with b.wait_timeout(30):
                self.wait_mounted(filesystem_desc)
            size = int(m.execute("df -k --output=size /run/foo | tail -1").strip())
            self.assertGreater(size, 300000)
        else:
            self.wait_card_button_disabled("LVM2 logical volume", "Grow")

        if can_shrink:
            b.click(self.card_button("LVM2 logical volume", "Shrink"))
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            if shrink_needs_unmount:
                b.wait_in_text("#dialog", "unmount, shrink")
            self.dialog_set_val("size", 200)
            if need_passphrase:
                self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            self.dialog_wait_close()
            b.wait_in_text(self.card_desc("LVM2 logical volume", "Size"), "201 MB")
            self.wait_mounted(filesystem_desc)
            size = int(m.execute("df -k --output=size /run/foo | tail -1").strip())
            self.assertLess(size, 300000)
        else:
            self.wait_card_button_disabled("LVM2 logical volume", "Shrink")

    def testResizeExt4(self):
        self.checkResize("ext4", crypto=False,
                         can_shrink=True, shrink_needs_unmount=True,
                         can_grow=True, grow_needs_unmount=False)

    def testResizeXfs(self):
        self.checkResize("xfs", crypto=False,
                         can_shrink=False,
                         can_grow=True, grow_needs_unmount=False)

    @testlib.skipImage("NTFS not supported on RHEL", "rhel-*", "centos-*")
    def testResizeNtfs(self):
        self.checkResize("ntfs", crypto=False,
                         can_shrink=True, shrink_needs_unmount=True,
                         can_grow=True, grow_needs_unmount=True)

    @testlib.skipImage("TODO: arch does not mount the LUKS partition after growing", "arch")
    def testResizeLuks(self):
        self.checkResize("ext4", crypto=True,
                         can_shrink=True, shrink_needs_unmount=True,
                         can_grow=True, grow_needs_unmount=False)

    def shrink_extfs(self, fs_dev, size):
        # fsadm can automatically unmount and check the fs when doing
        # a resize, but in that case it will try to remount it
        # afterwards.  This remounting will mostly fail because
        # UDisks2 has removed the mount point directory in the mean
        # time.  But sometimes it will succeed.  So we take control
        # and unmount the fs explicitly.  But then we also need to
        # check it explicitly.
        #
        self.machine.execute(f"(! findmnt -S '{fs_dev}' || umount '{fs_dev}'); fsadm -y check '{fs_dev}'; fsadm -y resize '{fs_dev}' '{size}'; udevadm trigger")

    def testGrowShrinkHelp(self):
        m = self.machine
        b = self.browser

        if self.storaged_version < [2, 7, 6]:
            # No Filesystem.Size property
            raise unittest.SkipTest("UDisks2 too old")

        self.login_and_go("/storage")
        disk = m.add_disk("500M", serial="DISK1")
        b.wait_visible(self.card_row("Storage", name=f"/dev/{disk['dev']}"))

        m.execute("vgcreate TEST /dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK1")
        b.wait_visible(self.card_row("Storage", name="TEST"))

        m.execute("lvcreate TEST -n vol -L 200m")
        b.wait_visible(self.card_row("Storage", name="vol"))

        self.click_card_row("Storage", name="TEST")
        b.wait_visible(self.card("LVM2 volume group"))
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 1), "vol")

        mountpoint = "/run/foo"
        self.click_dropdown(self.card_row("LVM2 logical volumes", 1), "Format")
        self.dialog_wait_open()
        self.dialog_set_val("name", "FSYS")
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("mount_point", mountpoint)
        self.dialog_apply()
        self.dialog_wait_close()
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), "ext4 filesystem")
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 3), mountpoint)

        # Click on the card to check the label
        b.click(self.card_row_col("LVM2 logical volumes", 1, 1))
        b.wait_text(self.card_desc("ext4 filesystem", "Name"), "FSYS")

        # Grow the logical volume and let Cockpit grow the filesystem
        m.execute("lvresize TEST/vol -L+100M")
        b.click(self.card_button("LVM2 logical volume", "Grow content"))
        b.wait_not_present(self.card_button("LVM2 logical volume", "Grow content"))
        size = int(m.execute(f"df -k --output=size {mountpoint} | tail -1").strip())
        self.assertGreater(size, 250000)

        # Shrink the filesystem and let Cockpit shrink the logical volume

        fs_dev = m.execute("lsblk -pnl /dev/TEST/vol -o NAME | tail -1").strip()
        self.shrink_extfs(fs_dev, "200M")

        b.click(self.card_button("ext4 filesystem", "Mount now"))
        self.wait_mounted("ext4 filesystem")
        b.click(self.card_button("LVM2 logical volume", "Shrink volume"))
        b.wait_not_present(self.card_button("LVM2 logical volume", "Shrink volume"))
        size = int(m.execute("lvs TEST/vol -o lv_size --noheading --units b --nosuffix"))
        self.assertLess(size, 250000000)

    @testlib.skipImage("cryptsetup uses too much memory, OOM on our test VMs", "rhel-8-*")
    def testGrowShrinkEncryptedHelp(self):
        m = self.machine
        b = self.browser

        if self.storaged_version < [2, 8, 0]:
            # No Encrypted.MetadataSize property
            raise unittest.SkipTest("UDisks2 too old")

        self.login_and_go("/storage")
        disk = m.add_disk("500M", serial="DISK1")
        b.wait_visible(self.card_row("Storage", name=f"/dev/{disk['dev']}"))

        m.execute("vgcreate TEST /dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK1")
        b.wait_visible(self.card_row("Storage", name="TEST"))

        m.execute("lvcreate TEST -n vol -L 200m")
        b.wait_visible(self.card_row("Storage", name="vol"))

        self.click_card_row("Storage", name="TEST")
        b.wait_visible(self.card("LVM2 volume group"))
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 1), "vol")

        mountpoint = "/run/foo"
        self.click_dropdown(self.card_row("LVM2 logical volumes", 1), "Format")
        self.dialog_wait_open()
        self.dialog_set_val("name", "FSYS")
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("crypto", self.default_crypto_type)
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("passphrase2", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("mount_point", mountpoint)
        self.dialog_apply()
        self.dialog_wait_close()
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), "ext4 filesystem (encrypted)")
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 3), mountpoint)

        # Grow the logical volume and let Cockpit grow the LUKS container and the filesystem
        m.execute("lvresize TEST/vol -L+100M")

        def confirm_with_passphrase():
            if self.default_crypto_type == "luks1":
                return
            self.dialog_wait_open()
            self.dialog_wait_apply_enabled()
            self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
            self.dialog_apply()
            self.dialog_wait_close()

        b.click(self.card_row_col("LVM2 logical volumes", 1, 1))
        b.click(self.card_button("LVM2 logical volume", "Grow content"))
        confirm_with_passphrase()
        b.wait_not_present(self.card_button("LVM2 logical volume", "Grow content"))
        size = int(m.execute(f"df -k --output=size {mountpoint} | tail -1").strip())
        self.assertGreater(size, 250000)

        # Shrink the filesystem and let Cockpit shrink the LUKS container and logical volume

        fs_dev = m.execute("lsblk -pnl /dev/TEST/vol -o NAME | tail -1").strip()
        self.shrink_extfs(fs_dev, "200M")
        b.click(self.card_button("ext4 filesystem", "Mount now"))
        self.wait_mounted("ext4 filesystem")

        b.click(self.card_button("LVM2 logical volume", "Shrink volume"))
        confirm_with_passphrase()
        b.wait_not_present(self.card_button("LVM2 logical volume", "Shrink volume"))
        size = int(m.execute("lvs TEST/vol -o lv_size --noheading --units b --nosuffix"))
        self.assertLess(size, 250000000)

        # Grow the logical volume and the LUKS container and let Cockpit grow the filesystem

        m.execute("lvresize TEST/vol -L+100M")
        m.execute(f"echo vainu-reku-toma-rolle-kaja | cryptsetup resize {fs_dev}")

        b.click(self.card_button("LVM2 logical volume", "Grow content"))
        confirm_with_passphrase()
        b.wait_not_present(self.card_button("LVM2 logical volume", "Grow volume"))
        size = int(m.execute(f"df -k --output=size {mountpoint} | tail -1").strip())
        self.assertGreater(size, 250000)

        # Shrink the filesystem and the LUKS container and let Cockpit shrink the logical volume
        self.shrink_extfs(fs_dev, "198M")
        m.execute(f"echo vainu-reku-toma-rolle-kaja | cryptsetup resize '{fs_dev}' 200M")
        b.click(self.card_button("ext4 filesystem", "Mount now"))
        self.wait_mounted("ext4 filesystem")
        b.click(self.card_button("LVM2 logical volume", "Shrink volume"))
        confirm_with_passphrase()
        b.wait_not_present(self.card_button("LVM2 logical volume", "Shrink volume"))
        size = int(m.execute("lvs TEST/vol -o lv_size --noheading --units b --nosuffix"))
        self.assertLess(size, 250000000)

    def testUnsupported(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")
        b.wait_visible(self.card("Storage"))
        disk = m.add_disk("500M", serial="DISK1")
        b.wait_visible(self.card_row("Storage", name=f"/dev/{disk['dev']}"))

        m.execute("vgcreate TEST /dev/disk/by-id/scsi-0QEMU_QEMU_HARDDISK_DISK1; udevadm trigger")
        b.wait_visible(self.card_row("Storage", name="TEST"))

        m.execute("lvcreate TEST -n vol -L 320m")
        b.wait_visible(self.card_row("Storage", name="vol"))

        self.click_card_row("Storage", name="TEST")
        b.wait_visible(self.card("LVM2 volume group"))
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 1), "vol")

        m.execute("mkfs.ext4 -L FSYS /dev/TEST/vol")
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), "ext4 filesystem")

        def fake_fstype(fstype):
            # Change fstype via a udev rule
            m.write("/run/udev/rules.d/99-fake-fstype.rules",
                    f'SUBSYSTEM=="block", ENV{{ID_FS_LABEL}}=="FSYS", ENV{{ID_FS_TYPE}}="{fstype}"\n')
            m.execute("udevadm control --reload; udevadm trigger")

        self.addCleanup(m.execute,
                        "rm /run/udev/rules.d/99-fake-fstype.rules; udevadm control --reload; udevadm trigger")

        def check_btn(title, excuse):
            btn = self.card_button("LVM2 logical volume", title)
            b.wait_visible(btn)
            b.mouse(btn, "mouseenter", 0, 0, 0)
            b.wait_in_text("#tip-storage", excuse)
            b.mouse(btn, "mouseleave", 0, 0, 0)
            b.wait_not_present("#tip-storage")

        fake_fstype("udf")  # UDF is a real filesystem type that UDisks knows about. It can definitely not be resized.
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), "udf filesystem")
        b.click(self.card_row_col("LVM2 logical volumes", 1, 1))
        check_btn("Shrink", "udf can not be resized")
        check_btn("Grow", "udf can not be resized")
        b.click(".pf-v5-c-breadcrumb__link:contains('TEST')")

        fake_fstype("fake")  # This is not a real filesystem and UDisks2 knows nothing about it.
        b.wait_text(self.card_row_col("LVM2 logical volumes", 1, 2), "fake filesystem")
        b.click(self.card_row_col("LVM2 logical volumes", 1, 1))
        check_btn("Shrink", "fake can not be resized here")
        check_btn("Grow", "fake can not be resized here")
        b.click(".pf-v5-c-breadcrumb__link:contains('TEST')")


if __name__ == '__main__':
    testlib.test_main()
